/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
import java.net.URLConnection;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapred.IFile.InMemoryReader;
import org.apache.hadoop.mapred.IFile.Reader;
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapred.Merger.Segment;
import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.mapred.iterative.LoopReduceCacheFilter;
import org.apache.hadoop.mapred.iterative.LoopReduceCacheSwitch;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;

/** A Reduce task. */
class RecoverReducerTask extends Task {

	static { // register a ctor
		WritableFactories.setFactory(RecoverReducerTask.class,
				new WritableFactory() {
					public Writable newInstance() {
						return new RecoverReducerTask();
					}
				});
	}

	private static final Log LOG = LogFactory.getLog(RecoverReducerTask.class
			.getName());
	private int numMaps;
	private ReduceCopier reduceCopier;

	private CompressionCodec codec;

	{
		getProgress().setStatus("recover");
		setPhase(TaskStatus.Phase.SHUFFLE); // phase to start with
	}

	private Progress copyPhase;
	private Progress sortPhase;
	private Progress reducePhase;
	private Counters.Counter reduceInputKeyCounter = getCounters().findCounter(
			Counter.REDUCE_INPUT_GROUPS);
	private Counters.Counter reduceInputValueCounter = getCounters()
			.findCounter(Counter.REDUCE_INPUT_RECORDS);
	private Counters.Counter reduceOutputCounter = getCounters().findCounter(
			Counter.REDUCE_OUTPUT_RECORDS);
	private Counters.Counter reduceCombineOutputCounter = getCounters()
			.findCounter(Counter.COMBINE_OUTPUT_RECORDS);
	private Counters.Counter reduceShuffleBytes = getCounters().findCounter(
			Counter.REDUCE_SHUFFLE_BYTES);

	/**
	 * HaLoop: the iteration to be recovered
	 */
	private int recoverIteration;

	/**
	 * HaLoop: the step to be recovered
	 */
	private int recoverStep;

	/**
	 * HaLoop: loop cache switch
	 */
	private LoopReduceCacheSwitch loopCacheControl;

	/**
	 * filter what to cache and what not
	 */
	private LoopReduceCacheFilter loopCacheFilter;

	// A custom comparator for map output files. Here the ordering is determined
	// by the file's size and path. In case of files with same size and
	// different
	// file paths, the first parameter is considered smaller than the second
	// one.
	// In case of files with same size and path are considered equal.
	private Comparator<FileStatus> mapOutputFileComparator = new Comparator<FileStatus>() {
		public int compare(FileStatus a, FileStatus b) {
			if (a.getLen() < b.getLen())
				return -1;
			else if (a.getLen() == b.getLen())
				if (a.getPath().toString().equals(b.getPath().toString()))
					return 0;
				else
					return -1;
			else
				return 1;
		}
	};

	// A sorted set for keeping a set of map output files on disk
	private final SortedSet<FileStatus> mapOutputFilesOnDisk = new TreeSet<FileStatus>(
			mapOutputFileComparator);

	public RecoverReducerTask() {
		super();
	}

	/**
	 * recovery: the latest cache written iteration map schedule
	 */
	List<MapScheduleInfo> mapSchedules = new ArrayList<MapScheduleInfo>();

	public RecoverReducerTask(String jobFile, TaskAttemptID taskId,
			int partition, int numMaps) {
		super(jobFile, taskId, partition);
	}

	/**
	 * HaLoop: refresh schedule log
	 * 
	 * @param conf
	 */
	private void loadJobConf() {
		try {
			FileSystem fs = FileSystem.get(conf);
			Path path = new Path(MRConstants.SCHEDULE_LOG_DIR + "/"
					+ this.getJobID() + "/conf.job");
			FSDataInputStream in = fs.open(path);
			JobConf job = new JobConf();
			job.readFields(in);

			int num = job.getNumberOfLoopBodySteps();
			for (int i = 0; i < num; i++)
				conf.setStepConf(i, job.getStepConf(i));
			in.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * recover from schedule log
	 */
	private void recoverMappersFromScheduleLog() throws IOException,
			InterruptedException, ClassNotFoundException {

		FileSystem fs = FileSystem.get(conf);
		Path path = new Path(MRConstants.SCHEDULE_LOG_DIR + "/"
				+ getJobID().toString() + "/schedule.log");
		FSDataInputStream scheduleLog = fs.open(path);

		mapSchedules.clear();
		while (scheduleLog.available() > 0) {
			MapScheduleInfo msi = new MapScheduleInfo();
			msi.readFields(scheduleLog);
			mapSchedules.add(msi);
		}

		scheduleLog.close();

		/**
		 * num of maps
		 */
		this.numMaps = mapSchedules.size();

		InetAddress addr = InetAddress.getLocalHost();
		String hostname = addr.getHostName();

		List<MapScheduleInfo> recoverMappers = new ArrayList<MapScheduleInfo>();

		String httpAddress = conf.get("mapred.task.tracker.http.address");
		String[] ipAndPort = httpAddress.split(":");

		System.out.println("current host " + hostname);
		System.out.println("recover for host "
				+ this.getRecoverFromTaskTracker());

		if (this.getNodeFailure()) {
			for (MapScheduleInfo msi : mapSchedules) {
				/**
				 * find tasktracker to be recovered
				 */
				if (msi.getHttpHost()
						.contains(this.getRecoverFromTaskTracker())
						|| this.getRecoverFromTaskTracker().contains(
								msi.getHttpHost())) {
					recoverMappers.add(msi);
					/**
					 * replace it with a new http address
					 */
					msi.setHttpAddress("http://" + hostname + ":"
							+ ipAndPort[1]);
					// System.out.println("recover from " + msi.getHttpHost());
				}
			}
		}
		/**
		 * find latest cached step
		 */
		int numSteps = conf.getNumberOfLoopBodySteps();
		int cachedIteration = iteration;
		int cachedStep = step;
		for (int latest = round; latest >= 0; latest--) {
			if (cachedStep > 0) {
				cachedStep--;
			} else {
				cachedStep = numSteps - 1;
				cachedIteration--;
			}
			if (loopCacheControl.isCacheWritten(conf, cachedIteration,
					cachedStep))
				break;
		}

		this.recoverIteration = cachedIteration;
		this.recoverStep = cachedStep;

		this.iteration = recoverIteration;
		this.step = recoverStep;
		this.round = recoverIteration * numOfLoopBodySteps + recoverStep;

		TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
		reporter.startCommunicationThread();
		boolean useNewApi = conf.getUseNewReducer();

		/**
		 * set the recovered iteration
		 */
		conf.setCurrentIterationAndStep(iteration, step);

		if (this.getNodeFailure()) {
			int numConcurrentMappers = 10;
			JobConf mapConf = conf.duplicate();
			mapConf.setInt("io.sort.mb", 100 / numConcurrentMappers);

			Thread[] mthreads = new Thread[numConcurrentMappers];
			int i = 0;
			int num = recoverMappers.size();
			for (i = 0; i < num;) {
				// execute mappers in batch
				int j = 0;
				for (; j < numConcurrentMappers && i < num; j++) {
					MapScheduleInfo msi = recoverMappers.get(i);
					MapTask mt = new MapTask(getJobFile(), msi
							.getTaskAttemptID(), msi.getPartition(), msi
							.getInputSplit().getClassName(), msi
							.getInputSplit().getBytes());
					// run map task locally to recover
					mt.setConf(mapConf);
					mt.setCurrentIteration(iteration);
					mt.setCurrentStep(step);
					mt.setRound();
					mt
							.initialize(mapConf, this.getJobID(), reporter,
									useNewApi);

					mthreads[j] = new Thread(new MapperRecoverThread(mapConf,
							umbilical, mt));
					mthreads[i].start();
					i++;
				}

				for (int k = 0; k < j; k++) {
					Thread thread = mthreads[k];
					try {
						thread.join();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}

			System.out.flush();
			System.out.println("recover mappers are finished");

			/**
			 * overwrite the scheduling log
			 */
			FSDataOutputStream jobLog = fs.create(path);
			for (MapScheduleInfo msi : mapSchedules) {
				msi.write(jobLog);
			}
			jobLog.close();
			System.out.println("recover schedule log are finished");
		}

	}

	class MapperRecoverThread implements Runnable {

		private MapTask mt;
		private JobConf job;
		private TaskUmbilicalProtocol umbilical;

		public MapperRecoverThread(JobConf job,
				TaskUmbilicalProtocol umbilical, MapTask mt) {
			this.mt = mt;
			this.job = job;
			this.umbilical = umbilical;
		}

		@Override
		public void run() {
			try {
				mt.run(job, umbilical);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

	private CompressionCodec initCodec() {
		// check if map-outputs are to be compressed
		if (conf.getCompressMapOutput()) {
			Class<? extends CompressionCodec> codecClass = conf
					.getMapOutputCompressorClass(DefaultCodec.class);
			return ReflectionUtils.newInstance(codecClass, conf);
		}
		return null;
	}

	@Override
	public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip)
			throws IOException {
		return new ReduceTaskRunner(tip, tracker, this.conf);
	}

	@Override
	public boolean isMapTask() {
		return false;
	}

	public int getNumMaps() {
		return numMaps;
	}

	/**
	 * Localize the given JobConf to be specific for this task.
	 */
	@Override
	public void localizeConfiguration(JobConf conf) throws IOException {
		super.localizeConfiguration(conf);
		conf.setNumMapTasks(numMaps);
	}

	@Override
	public void write(DataOutput out) throws IOException {
		super.write(out);
		out.writeInt(numMaps); // write the number of maps
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		super.readFields(in);
		numMaps = in.readInt();
	}

	// Get the input files for the reducer.
	private Path[] getMapFiles(FileSystem fs, boolean isLocal)
			throws IOException {
		List<Path> fileList = new ArrayList<Path>();
		if (isLocal) {
			// for local jobs
			for (int i = 0; i < numMaps; ++i) {
				fileList.add(mapOutputFile.getInputFile(i, getTaskID(), round));
			}
		} else {
			// for non local jobs
			for (FileStatus filestatus : mapOutputFilesOnDisk) {
				fileList.add(filestatus.getPath());
			}
		}
		return fileList.toArray(new Path[0]);
	}

	private class ReduceValuesIterator<KEY, VALUE> extends
			ValuesIterator<KEY, VALUE> {
		public ReduceValuesIterator(RawKeyValueIterator in,
				RawComparator<KEY> comparator, Class<KEY> keyClass,
				Class<VALUE> valClass, Configuration conf, Progressable reporter)
				throws IOException {
			super(in, comparator, keyClass, valClass, conf, reporter);
		}

		protected VALUE moveToNext() {
			return super.next();
		}

	}

	private class SkippingReduceValuesIterator<KEY, VALUE> extends
			ReduceValuesIterator<KEY, VALUE> {
		private SkipRangeIterator skipIt;
		private TaskUmbilicalProtocol umbilical;
		private Counters.Counter skipGroupCounter;
		private Counters.Counter skipRecCounter;
		private long grpIndex = -1;
		private Class<KEY> keyClass;
		private Class<VALUE> valClass;
		private SequenceFile.Writer skipWriter;
		private boolean toWriteSkipRecs;
		private boolean hasNext;
		private TaskReporter reporter;

		public SkippingReduceValuesIterator(RawKeyValueIterator in,
				RawComparator<KEY> comparator, Class<KEY> keyClass,
				Class<VALUE> valClass, Configuration conf,
				TaskReporter reporter, TaskUmbilicalProtocol umbilical)
				throws IOException {
			super(in, comparator, keyClass, valClass, conf, reporter);
			this.umbilical = umbilical;
			this.skipGroupCounter = reporter
					.getCounter(Counter.REDUCE_SKIPPED_GROUPS);
			this.skipRecCounter = reporter
					.getCounter(Counter.REDUCE_SKIPPED_RECORDS);
			this.toWriteSkipRecs = toWriteSkipRecs()
					&& SkipBadRecords.getSkipOutputPath(conf) != null;
			this.keyClass = keyClass;
			this.valClass = valClass;
			this.reporter = reporter;
			skipIt = getSkipRanges().skipRangeIterator();
			mayBeSkip();
		}

		void nextKey() throws IOException {
			super.nextKey();
			mayBeSkip();
		}

		public boolean more() {
			return super.more() && hasNext;
		}

		private void mayBeSkip() throws IOException {
			hasNext = skipIt.hasNext();
			if (!hasNext) {
				LOG.warn("Further groups got skipped.");
				return;
			}
			grpIndex++;
			long nextGrpIndex = skipIt.next();
			long skip = 0;
			long skipRec = 0;
			while (grpIndex < nextGrpIndex && super.more()) {
				while (hasNext()) {
					VALUE value = moveToNext();
					if (toWriteSkipRecs) {
						writeSkippedRec(getKey(), value);
					}
					skipRec++;
				}
				super.nextKey();
				grpIndex++;
				skip++;
			}

			// close the skip writer once all the ranges are skipped
			if (skip > 0 && skipIt.skippedAllRanges() && skipWriter != null) {
				skipWriter.close();
			}
			skipGroupCounter.increment(skip);
			skipRecCounter.increment(skipRec);
			reportNextRecordRange(umbilical, grpIndex);
		}

		@SuppressWarnings("unchecked")
		private void writeSkippedRec(KEY key, VALUE value) throws IOException {
			if (skipWriter == null) {
				Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
				Path skipFile = new Path(skipDir, getTaskID().toString());
				skipWriter = SequenceFile.createWriter(skipFile
						.getFileSystem(conf), conf, skipFile, keyClass,
						valClass, CompressionType.BLOCK, reporter);
			}
			skipWriter.append(key, value);
		}
	}

	@Override
	@SuppressWarnings("unchecked")
	public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
			throws IOException, InterruptedException, ClassNotFoundException {
		job = conf;
		if (isMapOrReduce()) {
			copyPhase = getProgress().addPhase("copy");
			sortPhase = getProgress().addPhase("sort");
			reducePhase = getProgress().addPhase("reduce");
		}

		this.loadJobConf();

		long start = System.currentTimeMillis();

		this.umbilical = umbilical;
		job.setBoolean("mapred.skip.on", isSkipping());

		// start thread that will handle communication with parent
		TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
		reporter.startCommunicationThread();
		boolean useNewApi = job.getUseNewReducer();
		initialize(job, getJobID(), reporter, useNewApi);

		// check if it is a cleanupJobTask
		if (jobCleanup) {
			runJobCleanupTask(umbilical, reporter);
			return;
		}
		if (jobSetup) {
			runJobSetupTask(umbilical, reporter);
			return;
		}
		if (taskCleanup) {
			runTaskCleanupTask(umbilical, reporter);
			return;
		}

		loopCacheControl = ReflectionUtils.newInstance(conf
				.getLoopReduceCacheSwitch(), conf);
		loopCacheFilter = ReflectionUtils.newInstance(conf
				.getLoopReduceCacheFilter(), conf);

		// Initialize the codec
		codec = initCodec();

		/**
		 * HaLoop: recover from schedule log
		 */
		// if (this.getNodeFailure())
		recoverMappersFromScheduleLog();

		/**
		 * HaLoop: initialize loopCacheControl
		 */
		if (job.isIterative() && loopCacheControl == null) {
			Class<? extends LoopReduceCacheSwitch> cacheControl = job
					.getLoopReduceCacheSwitch();
			loopCacheControl = ReflectionUtils.newInstance(cacheControl, job);
		}

		/**
		 * HaLoop: set up loop filter
		 */
		if (job.isIterative() && loopCacheFilter == null) {
			Class<? extends LoopReduceCacheFilter> cacheFilter = job
					.getLoopReduceCacheFilter();
			loopCacheFilter = ReflectionUtils.newInstance(cacheFilter, job);
		}

		boolean isLocal = "local"
				.equals(job.get("mapred.job.tracker", "local"));
		if (!isLocal) {
			reduceCopier = new ReduceCopier(umbilical, job, reporter);
			if (!reduceCopier.fetchOutputs()) {
				if (reduceCopier.mergeThrowable instanceof FSError) {
					throw (FSError) reduceCopier.mergeThrowable;
				}
				throw new IOException("Task: " + getTaskID()
						+ " - The reduce copier failed",
						reduceCopier.mergeThrowable);
			}
		}

		System.out.println("recover reduce copy finished");

		final FileSystem rfs = FileSystem.getLocal(job).getRaw();
		RawKeyValueIterator rIter = isLocal ? Merger.merge(job, rfs, job
				.getMapOutputKeyClass(), job.getMapOutputValueClass(), codec,
				getMapFiles(rfs, true), !conf.getKeepFailedTaskFiles(), job
						.getInt("io.sort.factor", 100), new Path(getTaskID()
						.toString()), job.getOutputKeyComparator(), reporter,
				spilledRecordsCounter, null) : reduceCopier.createKVIterator(
				job, rfs, reporter);

		// free up the data structures
		mapOutputFilesOnDisk.clear();

		// setPhase(TaskStatus.Phase.REDUCE);
		// statusUpdate(umbilical);
		Class keyClass = job.getMapOutputKeyClass();
		Class valueClass = job.getMapOutputValueClass();
		RawComparator comparator = job.getOutputValueGroupingComparator();

		System.out.println("run recover");
		// if this is a caching iteration, do recovery
		if (loopCacheControl.isCacheWritten(conf, iteration, step))
			runRecover(job, umbilical, reporter, rIter, comparator, keyClass,
					valueClass);

		long end = System.currentTimeMillis();
		System.out.println("reducer recover running time overall: "
				+ (end - start) + "ms");
	}

	/**
	 * default value for cache flag is true
	 */
	private int numValues = 0;

	/**
	 * HaLoop reduce cache logic is mainly in this method
	 * 
	 * @param <INKEY>
	 * @param <INVALUE>
	 * @param <OUTKEY>
	 * @param <OUTVALUE>
	 * @param job
	 * @param umbilical
	 * @param reporter
	 * @param rIter
	 * @param comparator
	 * @param keyClass
	 * @param valueClass
	 * @throws IOException
	 */
	@SuppressWarnings("unchecked")
	private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runRecover(JobConf job,
			TaskUmbilicalProtocol umbilical, final TaskReporter reporter,
			RawKeyValueIterator rIter, RawComparator<INKEY> comparator,
			Class<INKEY> keyClass, Class<INVALUE> valueClass)
			throws IOException {

		ReduceValuesIterator<INKEY, INVALUE> values = isSkipping() ? new SkippingReduceValuesIterator<INKEY, INVALUE>(
				rIter, comparator, keyClass, valueClass, job, reporter,
				umbilical)
				: new ReduceValuesIterator<INKEY, INVALUE>(rIter, job
						.getOutputValueGroupingComparator(), keyClass,
						valueClass, job, reporter);

		long writePos = 0;
		SerializationFactory serializationFactory = new SerializationFactory(
				job);
		Serializer<INVALUE> valSerializer = serializationFactory
				.getSerializer(valueClass);
		Serializer<INKEY> indexKeySerializer = serializationFactory
				.getSerializer(keyClass);
		Serializer<LongWritable> indexPositionSerializer = serializationFactory
				.getSerializer(LongWritable.class);
		Serializer<IntWritable> sizeSerializer = serializationFactory
				.getSerializer(IntWritable.class);

		FSDataOutputStream fileOutput = new FSDataOutputStream(null);
		FSDataOutputStream indexOutput = new FSDataOutputStream(null);

		if (job.isIterative()
				&& loopCacheControl.isCacheWritten(job, iteration, step)) {
			Path filePath = mapOutputFile.getReduceCacheFileForWrite(
					getTaskID(), -1, round);
			Path indexPath = mapOutputFile.getCacheIndexFileForWrite(
					getTaskID(), -1, round);

			FileSystem localFs = FileSystem.getLocal(conf);
			FileSystem lfs = ((LocalFileSystem) localFs).getRaw();

			fileOutput = lfs.create(filePath);
			indexOutput = lfs.create(indexPath);

			valSerializer.open(fileOutput);
			indexKeySerializer.open(indexOutput);
			indexPositionSerializer.open(indexOutput);
			sizeSerializer.open(indexOutput);
		}

		DataOutputBuffer bb = new DataOutputBuffer();
		DataInputBuffer ib = new DataInputBuffer();
		Serializer<INVALUE> ssl = serializationFactory
				.getSerializer(valueClass);
		Deserializer<INVALUE> dsl = serializationFactory
				.getDeserializer(valueClass);

		ssl.open(bb);
		dsl.open(ib);

		long iterationStart = System.currentTimeMillis();
		reduceTime = 0;
		long smallTime = 0;

		System.out.println("start recover reduce");

		while (values.more()) {
			INKEY key;

			// Yingyi's code: build the cache;
			if (conf.isIterative()
					&& loopCacheControl.isCacheWritten(job, iteration, step)) {
				// write cache, in a loop job
				key = values.getKey();
				indexKeySerializer.serialize(key);
				indexPositionSerializer.serialize(new LongWritable(writePos));

				numValues = 0;
				CacheWriteIterator<INKEY, INVALUE> valueIterator = new CacheWriteIterator<INKEY, INVALUE>(
						key, valSerializer, values);

				// just iterate, does not call reduce function
				while (valueIterator.hasNext()) {
					valueIterator.next();
				}

				writePos = fileOutput.getPos();
				values.nextKey();
			}
		}

		if (conf.isIterative() == true
				&& loopCacheControl.isCacheWritten(job, iteration, step)) {
			fileOutput.close();
			indexOutput.close();
		}

		System.out.println("recovery iteration " + round + ":" + reduceTime
				+ "ms");
		System.out.println("recovery branch time: " + smallTime);
		long iterationEnd = System.currentTimeMillis();
		System.out.println("recovery total time: "
				+ (iterationEnd - iterationStart) + "ms");
	}

	private class CacheWriteIterator<K, T> extends ValuesIterator<K, T> {
		Serializer<T> serializer = null;

		ValuesIterator<K, T> values = null;
		int count = 0;
		K key = null;

		public CacheWriteIterator(K k, Serializer<T> ser, Iterator<T> vals) {
			key = k;
			serializer = ser;
			values = (ValuesIterator<K, T>) vals;
		}

		public T next() {
			numValues++;
			T value = values.next();
			count++;

			try {
				if (loopCacheFilter.isCache(key, value, count))
					serializer.serialize(value);
				return value;
			} catch (IOException e) {
				e.printStackTrace();
				return null;
			}
		}

		public boolean more() {
			return values.more();
		}

		public boolean hasNext() {
			if (serializer == null || values == null)
				return false;
			return values.hasNext();
		}

		public void remove() {
		}
	}

	private long reduceTime = 0;

	static class NewTrackingRecordWriter<K, V> extends
			org.apache.hadoop.mapreduce.RecordWriter<K, V> {
		private final org.apache.hadoop.mapreduce.RecordWriter<K, V> real;
		private final org.apache.hadoop.mapreduce.Counter outputRecordCounter;

		NewTrackingRecordWriter(
				org.apache.hadoop.mapreduce.RecordWriter<K, V> real,
				org.apache.hadoop.mapreduce.Counter recordCounter) {
			this.real = real;
			this.outputRecordCounter = recordCounter;
		}

		@Override
		public void close(TaskAttemptContext context) throws IOException,
				InterruptedException {
			real.close(context);
		}

		@Override
		public void write(K key, V value) throws IOException,
				InterruptedException {
			real.write(key, value);
			outputRecordCounter.increment(1);
		}
	}

	class ReduceCopier<K, V> implements MRConstants {

		/** Reference to the umbilical object */
		private TaskUmbilicalProtocol umbilical;
		private final TaskReporter reporter;

		/** Reference to the task object */

		/** Number of ms before timing out a copy */
		private static final int STALLED_COPY_TIMEOUT = 3 * 60 * 1000;

		/** Max events to fetch in one go from the tasktracker */
		private static final int MAX_EVENTS_TO_FETCH = 10000;

		/**
		 * our reduce task instance
		 */
		private RecoverReducerTask reduceTask;

		/**
		 * the list of map outputs currently being copied
		 */
		private List<MapOutputLocation> scheduledCopies;

		/**
		 * the results of dispatched copy attempts
		 */
		private List<CopyResult> copyResults;

		/**
		 * the number of outputs to copy in parallel
		 */
		private int numCopiers;

		/**
		 * a number that is set to the max #fetches we'd schedule and then pause
		 * the schduling
		 */
		private int maxInFlight;

		/**
		 * the amount of time spent on fetching one map output before
		 * considering it as failed and notifying the jobtracker about it.
		 */
		private int maxBackoff;

		/**
		 * busy hosts from which copies are being backed off Map of host -> next
		 * contact time
		 */
		private Map<String, Long> penaltyBox;

		/**
		 * the set of unique hosts from which we are copying
		 */
		private Set<String> uniqueHosts;

		/**
		 * A reference to the RamManager for writing the map outputs to.
		 */

		private ShuffleRamManager ramManager;

		/**
		 * A reference to the local file system for writing the map outputs to.
		 */
		private FileSystem localFileSys;

		private FileSystem rfs;
		/**
		 * Number of files to merge at a time
		 */
		private int ioSortFactor;

		/**
		 * A reference to the throwable object (if merge throws an exception)
		 */
		private volatile Throwable mergeThrowable;

		/**
		 * A flag to indicate when to exit localFS merge
		 */
		private volatile boolean exitLocalFSMerge = false;

		/**
		 * A flag to indicate when to exit getMapEvents thread
		 */
		private volatile boolean exitGetMapEvents = false;

		/**
		 * When we accumulate maxInMemOutputs number of files in ram, we
		 * merge/spill
		 */
		private final int maxInMemOutputs;

		/**
		 * Usage threshold for in-memory output accumulation.
		 */
		private final float maxInMemCopyPer;

		/**
		 * Maximum memory usage of map outputs to merge from memory into the
		 * reduce, in bytes.
		 */
		private final long maxInMemReduce;

		/**
		 * The threads for fetching the files.
		 */
		private List<MapOutputCopier> copiers = null;

		/**
		 * The object for metrics reporting.
		 */
		private ShuffleClientMetrics shuffleClientMetrics = null;

		/**
		 * the minimum interval between tasktracker polls
		 */
		private static final long MIN_POLL_INTERVAL = 1000;

		/**
		 * a list of map output locations for fetch retrials
		 */
		private List<MapOutputLocation> retryFetches = new ArrayList<MapOutputLocation>();

		/**
		 * The set of required map outputs
		 */
		private Set<TaskID> copiedMapOutputs = Collections
				.synchronizedSet(new TreeSet<TaskID>());

		/**
		 * The set of obsolete map taskids.
		 */
		private Set<TaskAttemptID> obsoleteMapIds = Collections
				.synchronizedSet(new TreeSet<TaskAttemptID>());

		private Random random = null;

		/**
		 * the max of all the map completion times
		 */
		private int maxMapRuntime;

		/**
		 * Maximum number of fetch-retries per-map.
		 */
		private volatile int maxFetchRetriesPerMap;

		/**
		 * Combiner runner, if a combiner is needed
		 */
		private CombinerRunner combinerRunner;

		/**
		 * Resettable collector used for combine.
		 */
		private CombineOutputCollector combineCollector = null;

		/**
		 * Maximum percent of failed fetch attempt before killing the reduce
		 * task.
		 */
		private static final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;

		/**
		 * Minimum percent of progress required to keep the reduce alive.
		 */
		private static final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;

		/**
		 * Maximum percent of shuffle execution time required to keep the
		 * reducer alive.
		 */
		private static final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;

		/**
		 * Minimum number of map fetch retries.
		 */
		private static final int MIN_FETCH_RETRIES_PER_MAP = 2;

		/**
		 * Maximum no. of unique maps from which we failed to fetch map-outputs
		 * even after {@link #maxFetchRetriesPerMap} retries; after this the
		 * reduce task is failed.
		 */
		private int maxFailedUniqueFetches = 5;

		/**
		 * The maps from which we fail to fetch map-outputs even after
		 * {@link #maxFetchRetriesPerMap} retries.
		 */
		Set<TaskID> fetchFailedMaps = new TreeSet<TaskID>();

		/**
		 * A map of taskId -> no. of failed fetches
		 */
		Map<TaskAttemptID, Integer> mapTaskToFailedFetchesMap = new HashMap<TaskAttemptID, Integer>();

		/**
		 * Initial backoff interval (milliseconds)
		 */
		private static final int BACKOFF_INIT = 4000;

		/**
		 * The interval for logging in the shuffle
		 */
		private static final int MIN_LOG_TIME = 60000;

		/**
		 * List of in-memory map-outputs.
		 */
		private final List<MapOutput> mapOutputsFilesInMemory = Collections
				.synchronizedList(new LinkedList<MapOutput>());

		/**
		 * The map for (Hosts, List of MapIds from this Host) maintaining map
		 * output locations
		 */
		private final Map<String, List<MapOutputLocation>> mapLocations = new ConcurrentHashMap<String, List<MapOutputLocation>>();

		/**
		 * This class contains the methods that should be used for
		 * metrics-reporting the specific metrics for shuffle. This class
		 * actually reports the metrics for the shuffle client (the ReduceTask),
		 * and hence the name ShuffleClientMetrics.
		 */
		class ShuffleClientMetrics implements Updater {
			private MetricsRecord shuffleMetrics = null;
			private int numFailedFetches = 0;
			private int numSuccessFetches = 0;
			private long numBytes = 0;
			private int numThreadsBusy = 0;

			ShuffleClientMetrics(JobConf conf) {
				MetricsContext metricsContext = MetricsUtil
						.getContext("mapred");
				this.shuffleMetrics = MetricsUtil.createRecord(metricsContext,
						"shuffleInput");
				this.shuffleMetrics.setTag("user", conf.getUser());
				this.shuffleMetrics.setTag("jobName", conf.getJobName());
				this.shuffleMetrics.setTag("jobId", RecoverReducerTask.this
						.getJobID().toString());
				this.shuffleMetrics.setTag("taskId", getTaskID().toString());
				this.shuffleMetrics.setTag("sessionId", conf.getSessionId());
				metricsContext.registerUpdater(this);
			}

			public synchronized void inputBytes(long numBytes) {
				this.numBytes += numBytes;
			}

			public synchronized void failedFetch() {
				++numFailedFetches;
			}

			public synchronized void successFetch() {
				++numSuccessFetches;
			}

			public synchronized void threadBusy() {
				++numThreadsBusy;
			}

			public synchronized void threadFree() {
				--numThreadsBusy;
			}

			public void doUpdates(MetricsContext unused) {
				synchronized (this) {
					shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes);
					shuffleMetrics.incrMetric("shuffle_failed_fetches",
							numFailedFetches);
					shuffleMetrics.incrMetric("shuffle_success_fetches",
							numSuccessFetches);
					if (numCopiers != 0) {
						shuffleMetrics.setMetric(
								"shuffle_fetchers_busy_percent",
								100 * ((float) numThreadsBusy / numCopiers));
					} else {
						shuffleMetrics.setMetric(
								"shuffle_fetchers_busy_percent", 0);
					}
					numBytes = 0;
					numSuccessFetches = 0;
					numFailedFetches = 0;
				}
				shuffleMetrics.update();
			}
		}

		/** Represents the result of an attempt to copy a map output */
		private class CopyResult {

			// the map output location against which a copy attempt was made
			private final MapOutputLocation loc;

			// the size of the file copied, -1 if the transfer failed
			private final long size;

			// a flag signifying whether a copy result is obsolete
			private static final int OBSOLETE = -2;

			CopyResult(MapOutputLocation loc, long size) {
				this.loc = loc;
				this.size = size;
			}

			public boolean getSuccess() {
				return size >= 0;
			}

			public boolean isObsolete() {
				return size == OBSOLETE;
			}

			public long getSize() {
				return size;
			}

			public String getHost() {
				return loc.getHost();
			}

			public MapOutputLocation getLocation() {
				return loc;
			}
		}

		private int nextMapOutputCopierId = 0;

		/**
		 * Abstraction to track a map-output.
		 */
		private class MapOutputLocation {
			TaskAttemptID taskAttemptId;
			TaskID taskId;
			String ttHost;
			URL taskOutput;

			public MapOutputLocation(TaskAttemptID taskAttemptId,
					String ttHost, URL taskOutput) {
				this.taskAttemptId = taskAttemptId;
				this.taskId = this.taskAttemptId.getTaskID();
				this.ttHost = ttHost;
				this.taskOutput = taskOutput;
			}

			public TaskAttemptID getTaskAttemptId() {
				return taskAttemptId;
			}

			public TaskID getTaskId() {
				return taskId;
			}

			public String getHost() {
				return ttHost;
			}

			public URL getOutputLocation() {
				return taskOutput;
			}
		}

		/** Describes the output of a map; could either be on disk or in-memory. */
		private class MapOutput {
			final TaskID mapId;
			final TaskAttemptID mapAttemptId;

			final Path file;
			final Configuration conf;

			byte[] data;
			final boolean inMemory;
			long compressedSize;

			public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId,
					Configuration conf, Path file, long size) {
				this.mapId = mapId;
				this.mapAttemptId = mapAttemptId;

				this.conf = conf;
				this.file = file;
				this.compressedSize = size;

				this.data = null;

				this.inMemory = false;
			}

			public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId,
					byte[] data, int compressedLength) {
				this.mapId = mapId;
				this.mapAttemptId = mapAttemptId;

				this.file = null;
				this.conf = null;

				this.data = data;
				this.compressedSize = compressedLength;

				this.inMemory = true;
			}

			public void discard() throws IOException {
				if (inMemory) {
					data = null;
				} else {
					FileSystem fs = file.getFileSystem(conf);
					fs.delete(file, true);
				}
			}
		}

		class ShuffleRamManager implements RamManager {
			/*
			 * Maximum percentage of the in-memory limit that a single shuffle
			 * can consume
			 */
			private static final float MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION = 0.25f;

			/*
			 * Maximum percentage of shuffle-threads which can be stalled
			 * simultaneously after which a merge is triggered.
			 */
			private static final float MAX_STALLED_SHUFFLE_THREADS_FRACTION = 0.75f;

			private final int maxSize;
			private final int maxSingleShuffleLimit;

			private int size = 0;

			private Object dataAvailable = new Object();
			private int fullSize = 0;
			private int numPendingRequests = 0;
			private int numRequiredMapOutputs = 0;
			private int numClosed = 0;
			private boolean closed = false;

			public ShuffleRamManager(Configuration conf) throws IOException {
				final float maxInMemCopyUse = conf.getFloat(
						"mapred.job.shuffle.input.buffer.percent", 0.70f);
				if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
					throw new IOException(
							"mapred.job.shuffle.input.buffer.percent"
									+ maxInMemCopyUse);
				}
				maxSize = (int) Math.min(Runtime.getRuntime().maxMemory()
						* maxInMemCopyUse, Integer.MAX_VALUE);
				maxSingleShuffleLimit = (int) (maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
				LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize
						+ ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit);
			}

			public synchronized boolean reserve(int requestedSize,
					InputStream in) throws InterruptedException {
				// Wait till the request can be fulfilled...
				while ((size + requestedSize) > maxSize) {

					// Close the input...
					if (in != null) {
						try {
							in.close();
						} catch (IOException ie) {
							LOG.info("Failed to close connection with: " + ie);
						} finally {
							in = null;
						}
					}

					// Track pending requests
					synchronized (dataAvailable) {
						++numPendingRequests;
						dataAvailable.notify();
					}

					// Wait for memory to free up
					wait();

					// Track pending requests
					synchronized (dataAvailable) {
						--numPendingRequests;
					}
				}

				size += requestedSize;

				return (in != null);
			}

			public synchronized void unreserve(int requestedSize) {
				size -= requestedSize;

				synchronized (dataAvailable) {
					fullSize -= requestedSize;
					--numClosed;
				}

				// Notify the threads blocked on RamManager.reserve
				notifyAll();
			}

			public boolean waitForDataToMerge() throws InterruptedException {
				boolean done = false;
				synchronized (dataAvailable) {
					// Start in-memory merge if manager has been closed or...
					while (!closed
							&&
							// In-memory threshold exceeded and at least two
							// segments
							// have been fetched
							(getPercentUsed() < maxInMemCopyPer || numClosed < 2)
							&&
							// More than "mapred.inmem.merge.threshold" map
							// outputs
							// have been fetched into memory
							(maxInMemOutputs <= 0 || numClosed < maxInMemOutputs)
							&&
							// More than MAX... threads are blocked on the
							// RamManager
							// or the blocked threads are the last map outputs
							// to be
							// fetched. If numRequiredMapOutputs is zero, either
							// setNumCopiedMapOutputs has not been called (no
							// map ouputs
							// have been fetched, so there is nothing to merge)
							// or the
							// last map outputs being transferred without
							// contention, so a merge would be premature.
							(numPendingRequests < numCopiers
									* MAX_STALLED_SHUFFLE_THREADS_FRACTION && (0 == numRequiredMapOutputs || numPendingRequests < numRequiredMapOutputs))) {
						dataAvailable.wait();
					}
					done = closed;
				}
				return done;
			}

			public void closeInMemoryFile(int requestedSize) {
				synchronized (dataAvailable) {
					fullSize += requestedSize;
					++numClosed;
					dataAvailable.notify();
				}
			}

			public void setNumCopiedMapOutputs(int numRequiredMapOutputs) {
				synchronized (dataAvailable) {
					this.numRequiredMapOutputs = numRequiredMapOutputs;
					dataAvailable.notify();
				}
			}

			public void close() {
				synchronized (dataAvailable) {
					closed = true;
					LOG.info("Closed ram manager");
					dataAvailable.notify();
				}
			}

			private float getPercentUsed() {
				return (float) fullSize / maxSize;
			}

			int getMemoryLimit() {
				return maxSize;
			}

			boolean canFitInMemory(long requestedSize) {
				return (requestedSize < Integer.MAX_VALUE && requestedSize < maxSingleShuffleLimit);
			}
		}

		/** Copies map outputs as they become available */
		private class MapOutputCopier extends Thread {
			// basic/unit connection timeout (in milliseconds)
			private final static int UNIT_CONNECT_TIMEOUT = 30 * 1000;
			// default read timeout (in milliseconds)
			private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;

			private MapOutputLocation currentLocation = null;
			private int id = nextMapOutputCopierId++;
			private Reporter reporter;

			// Decompression of map-outputs
			private CompressionCodec codec = null;
			private Decompressor decompressor = null;

			public MapOutputCopier(JobConf job, Reporter reporter) {
				setName("MapOutputCopier " + reduceTask.getTaskID() + "." + id);
				LOG.debug(getName() + " created");
				this.reporter = reporter;

				if (job.getCompressMapOutput()) {
					Class<? extends CompressionCodec> codecClass = job
							.getMapOutputCompressorClass(DefaultCodec.class);
					codec = ReflectionUtils.newInstance(codecClass, job);
					decompressor = CodecPool.getDecompressor(codec);
				}
			}

			/**
			 * Fail the current file that we are fetching
			 * 
			 * @return were we currently fetching?
			 */
			public synchronized boolean fail() {
				if (currentLocation != null) {
					finish(-1);
					return true;
				} else {
					return false;
				}
			}

			/**
			 * Get the current map output location.
			 */
			public synchronized MapOutputLocation getLocation() {
				return currentLocation;
			}

			private synchronized void start(MapOutputLocation loc) {
				currentLocation = loc;
			}

			private synchronized void finish(long size) {
				if (currentLocation != null) {
					LOG.debug(getName() + " finishing " + currentLocation
							+ " =" + size);
					synchronized (copyResults) {
						copyResults.add(new CopyResult(currentLocation, size));
						copyResults.notify();
					}
					currentLocation = null;
				}
			}

			/**
			 * Loop forever and fetch map outputs as they become available. The
			 * thread exits when it is interrupted by {@link ReduceTaskRunner}
			 */
			@Override
			public void run() {
				while (true) {
					try {
						MapOutputLocation loc = null;
						long size = -1;

						synchronized (scheduledCopies) {
							while (scheduledCopies.isEmpty()) {
								scheduledCopies.wait();
							}
							loc = scheduledCopies.remove(0);
						}

						try {
							shuffleClientMetrics.threadBusy();
							start(loc);
							size = copyOutput(loc);
							shuffleClientMetrics.successFetch();
						} catch (IOException e) {
							LOG.warn(reduceTask.getTaskID() + " copy failed: "
									+ loc.getTaskAttemptId() + " from "
									+ loc.getHost());
							LOG.warn(StringUtils.stringifyException(e));
							shuffleClientMetrics.failedFetch();

							// Reset
							size = -1;
						} finally {
							shuffleClientMetrics.threadFree();
							finish(size);
						}
					} catch (InterruptedException e) {
						break; // ALL DONE
					} catch (FSError e) {
						LOG.error("Task: " + reduceTask.getTaskID()
								+ " - FSError: "
								+ StringUtils.stringifyException(e));
						try {
							umbilical.fsError(reduceTask.getTaskID(), e
									.getMessage());
						} catch (IOException io) {
							LOG.error("Could not notify TT of FSError: "
									+ StringUtils.stringifyException(io));
						}
					} catch (Throwable th) {
						String msg = getTaskID()
								+ " : Map output copy failure : "
								+ StringUtils.stringifyException(th);
						reportFatalError(getTaskID(), th, msg);
					}
				}

				if (decompressor != null) {
					CodecPool.returnDecompressor(decompressor);
				}

			}

			/**
			 * Copies a a map output from a remote host, via HTTP.
			 * 
			 * @param currentLocation
			 *            the map output location to be copied
			 * @return the path (fully qualified) of the copied file
			 * @throws IOException
			 *             if there is an error copying the file
			 * @throws InterruptedException
			 *             if the copier should give up
			 */
			private long copyOutput(MapOutputLocation loc) throws IOException,
					InterruptedException {
				// check if we still need to copy the output from this location
				if (copiedMapOutputs.contains(loc.getTaskId())
						|| obsoleteMapIds.contains(loc.getTaskAttemptId())) {
					return CopyResult.OBSOLETE;
				}

				// a temp filename. If this file gets created in ramfs, we're
				// fine,
				// else, we will check the localFS to find a suitable final
				// location
				// for this path
				TaskAttemptID reduceId = reduceTask.getTaskID();
				Path filename = new Path("/"
						+ TaskTracker.getIntermediateOutputDir(reduceId
								.getJobID().toString(), reduceId.toString())
						+ "/map_" + loc.getTaskId().getId() + ".out");

				// Copy the map output to a temp file whose name is unique to
				// this attempt
				Path tmpMapOutput = new Path(filename + "-" + id);

				// Copy the map output
				MapOutput mapOutput = getMapOutput(loc, tmpMapOutput, reduceId
						.getTaskID().getId());
				if (mapOutput == null) {
					throw new IOException("Failed to fetch map-output for "
							+ loc.getTaskAttemptId() + " from " + loc.getHost());
				}

				// The size of the map-output
				long bytes = mapOutput.compressedSize;

				// lock the ReduceTask while we do the rename
				synchronized (RecoverReducerTask.this) {
					if (copiedMapOutputs.contains(loc.getTaskId())) {
						mapOutput.discard();
						return CopyResult.OBSOLETE;
					}

					// Special case: discard empty map-outputs
					if (bytes == 0) {
						try {
							mapOutput.discard();
						} catch (IOException ioe) {
							LOG.info("Couldn't discard output of "
									+ loc.getTaskId());
						}

						// Note that we successfully copied the map-output
						noteCopiedMapOutput(loc.getTaskId());

						return bytes;
					}

					// Process map-output
					if (mapOutput.inMemory) {
						// Save it in the synchronized list of map-outputs
						mapOutputsFilesInMemory.add(mapOutput);
					} else {
						// Rename the temporary file to the final file;
						// ensure it is on the same partition
						tmpMapOutput = mapOutput.file;
						filename = new Path(tmpMapOutput.getParent(), filename
								.getName());
						if (!localFileSys.rename(tmpMapOutput, filename)) {
							localFileSys.delete(tmpMapOutput, true);
							bytes = -1;
							throw new IOException(
									"Failed to rename map output "
											+ tmpMapOutput + " to " + filename);
						}

						synchronized (mapOutputFilesOnDisk) {
							addToMapOutputFilesOnDisk(localFileSys
									.getFileStatus(filename));
						}
					}

					// Note that we successfully copied the map-output
					noteCopiedMapOutput(loc.getTaskId());
				}

				return bytes;
			}

			/**
			 * Save the map taskid whose output we just copied. This function
			 * assumes that it has been synchronized on ReduceTask.this.
			 * 
			 * @param taskId
			 *            map taskid
			 */
			private void noteCopiedMapOutput(TaskID taskId) {
				copiedMapOutputs.add(taskId);
				ramManager.setNumCopiedMapOutputs(numMaps
						- copiedMapOutputs.size());
			}

			/**
			 * Get the map output into a local file (either in the inmemory fs
			 * or on the local fs) from the remote server. We use the file
			 * system so that we generate checksum files on the data.
			 * 
			 * @param mapOutputLoc
			 *            map-output to be fetched
			 * @param filename
			 *            the filename to write the data into
			 * @param connectionTimeout
			 *            number of milliseconds for connection timeout
			 * @param readTimeout
			 *            number of milliseconds for read timeout
			 * @return the path of the file that got created
			 * @throws IOException
			 *             when something goes wrong
			 */
			private MapOutput getMapOutput(MapOutputLocation mapOutputLoc,
					Path filename, int reduce) throws IOException,
					InterruptedException {
				// Connect
				URLConnection connection = mapOutputLoc.getOutputLocation()
						.openConnection();
				InputStream input = getInputStream(connection,
						STALLED_COPY_TIMEOUT, DEFAULT_READ_TIMEOUT);

				// Validate header from map output
				TaskAttemptID mapId = null;
				try {
					mapId = TaskAttemptID.forName(connection
							.getHeaderField(FROM_MAP_TASK));
				} catch (IllegalArgumentException ia) {
					LOG.warn("Invalid map id ", ia);
					return null;
				}
				TaskAttemptID expectedMapId = mapOutputLoc.getTaskAttemptId();
				if (!mapId.equals(expectedMapId)) {
					LOG.warn("data from wrong map:" + mapId
							+ " arrived to reduce task " + reduce
							+ ", where as expected map output should be from "
							+ expectedMapId);
					return null;
				}

				long decompressedLength = Long.parseLong(connection
						.getHeaderField(RAW_MAP_OUTPUT_LENGTH));
				long compressedLength = Long.parseLong(connection
						.getHeaderField(MAP_OUTPUT_LENGTH));

				if (compressedLength < 0 || decompressedLength < 0) {
					LOG.warn(getName()
							+ " invalid lengths in map output header: id: "
							+ mapId + " compressed len: " + compressedLength
							+ ", decompressed len: " + decompressedLength);
					return null;
				}
				int forReduce = (int) Integer.parseInt(connection
						.getHeaderField(FOR_REDUCE_TASK));

				if (forReduce != reduce) {
					LOG.warn("data for the wrong reduce: " + forReduce
							+ " with compressed len: " + compressedLength
							+ ", decompressed len: " + decompressedLength
							+ " arrived to reduce task " + reduce);
					return null;
				}
				LOG.info("header: " + mapId + ", compressed len: "
						+ compressedLength + ", decompressed len: "
						+ decompressedLength);

				// We will put a file in memory if it meets certain criteria:
				// 1. The size of the (decompressed) file should be less than
				// 25% of
				// the total inmem fs
				// 2. There is space available in the inmem fs

				// Check if this map-output can be saved in-memory
				boolean shuffleInMemory = ramManager
						.canFitInMemory(decompressedLength);

				// Shuffle
				MapOutput mapOutput = null;

				// close in-memory shuffling for comparison
				shuffleInMemory = false;
				if (shuffleInMemory) {
					LOG.info("Shuffling " + decompressedLength + " bytes ("
							+ compressedLength + " raw bytes) "
							+ "into RAM from "
							+ mapOutputLoc.getTaskAttemptId());

					mapOutput = shuffleInMemory(mapOutputLoc, connection,
							input, (int) decompressedLength,
							(int) compressedLength);
				} else {
					LOG.info("Shuffling " + decompressedLength + " bytes ("
							+ compressedLength + " raw bytes) "
							+ "into Local-FS from "
							+ mapOutputLoc.getTaskAttemptId());

					mapOutput = shuffleToDisk(mapOutputLoc, input, filename,
							compressedLength);
				}

				return mapOutput;
			}

			/**
			 * The connection establishment is attempted multiple times and is
			 * given up only on the last failure. Instead of connecting with a
			 * timeout of X, we try connecting with a timeout of x < X but
			 * multiple times.
			 */
			private InputStream getInputStream(URLConnection connection,
					int connectionTimeout, int readTimeout) throws IOException {
				int unit = 0;
				if (connectionTimeout < 0) {
					throw new IOException("Invalid timeout " + "[timeout = "
							+ connectionTimeout + " ms]");
				} else if (connectionTimeout > 0) {
					unit = (UNIT_CONNECT_TIMEOUT > connectionTimeout) ? connectionTimeout
							: UNIT_CONNECT_TIMEOUT;
				}
				// set the read timeout to the total timeout
				connection.setReadTimeout(readTimeout);
				// set the connect timeout to the unit-connect-timeout
				connection.setConnectTimeout(unit);
				while (true) {
					try {
						return connection.getInputStream();
					} catch (IOException ioe) {
						// update the total remaining connect-timeout
						connectionTimeout -= unit;

						// throw an exception if we have waited for timeout
						// amount of time
						// note that the updated value if timeout is used here
						if (connectionTimeout == 0) {
							throw ioe;
						}

						// reset the connect timeout for the last try
						if (connectionTimeout < unit) {
							unit = connectionTimeout;
							// reset the connect time out for the final connect
							connection.setConnectTimeout(unit);
						}
					}
				}
			}

			private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc,
					URLConnection connection, InputStream input,
					int mapOutputLength, int compressedLength)
					throws IOException, InterruptedException {
				// Reserve ram for the map-output
				boolean createdNow = ramManager.reserve(mapOutputLength, input);

				// Reconnect if we need to
				if (!createdNow) {
					// Reconnect
					try {
						connection = mapOutputLoc.getOutputLocation()
								.openConnection();
						input = getInputStream(connection,
								STALLED_COPY_TIMEOUT, DEFAULT_READ_TIMEOUT);
					} catch (IOException ioe) {
						LOG
								.info("Failed reopen connection to fetch map-output from "
										+ mapOutputLoc.getHost());

						// Inform the ram-manager
						ramManager.closeInMemoryFile(mapOutputLength);
						ramManager.unreserve(mapOutputLength);

						throw ioe;
					}
				}

				IFileInputStream checksumIn = new IFileInputStream(input,
						compressedLength);

				input = checksumIn;

				// Are map-outputs compressed?
				if (codec != null) {
					decompressor.reset();
					input = codec.createInputStream(input, decompressor);
				}

				// Copy map-output into an in-memory buffer
				byte[] shuffleData = new byte[mapOutputLength];
				MapOutput mapOutput = new MapOutput(mapOutputLoc.getTaskId(),
						mapOutputLoc.getTaskAttemptId(), shuffleData,
						compressedLength);

				int bytesRead = 0;
				try {
					int n = input.read(shuffleData, 0, shuffleData.length);
					while (n > 0) {
						bytesRead += n;
						shuffleClientMetrics.inputBytes(n);

						// indicate we're making progress
						reporter.progress();
						n = input.read(shuffleData, bytesRead,
								(shuffleData.length - bytesRead));
					}

					LOG.info("Read " + bytesRead
							+ " bytes from map-output for "
							+ mapOutputLoc.getTaskAttemptId());

					input.close();
				} catch (IOException ioe) {
					LOG.info("Failed to shuffle from "
							+ mapOutputLoc.getTaskAttemptId(), ioe);

					// Inform the ram-manager
					ramManager.closeInMemoryFile(mapOutputLength);
					ramManager.unreserve(mapOutputLength);

					// Discard the map-output
					try {
						mapOutput.discard();
					} catch (IOException ignored) {
						LOG.info("Failed to discard map-output from "
								+ mapOutputLoc.getTaskAttemptId(), ignored);
					}
					mapOutput = null;

					// Close the streams
					IOUtils.cleanup(LOG, input);

					// Re-throw
					throw ioe;
				}

				// Close the in-memory file
				// ramManager.closeInMemoryFile(mapOutputLength);
				LOG.info("so far so good");
				ramManager.closeInMemoryFile(bytesRead);

				// Sanity check
				// if (bytesRead != mapOutputLength) {
				// // Inform the ram-manager
				// ramManager.unreserve(mapOutputLength);
				//
				// // Discard the map-output
				// try {
				// mapOutput.discard();
				// } catch (IOException ignored) {
				// // IGNORED because we are cleaning up
				// LOG.info("Failed to discard map-output from "
				// + mapOutputLoc.getTaskAttemptId(), ignored);
				// }
				// mapOutput = null;
				//
				// throw new IOException("Incomplete map output received for "
				// + mapOutputLoc.getTaskAttemptId() + " from "
				// + mapOutputLoc.getOutputLocation() + " ("
				// + bytesRead + " instead of " + mapOutputLength
				// + ")");
				// }

				// TODO: Remove this after a 'fix' for HADOOP-3647
				if (mapOutputLength > 0) {
					DataInputBuffer dib = new DataInputBuffer();
					dib.reset(shuffleData, 0, shuffleData.length);
					LOG.info("Rec #1 from " + mapOutputLoc.getTaskAttemptId()
							+ " -> (" + WritableUtils.readVInt(dib) + ", "
							+ WritableUtils.readVInt(dib) + ") from "
							+ mapOutputLoc.getHost());
				} else {
					LOG.info("map output length: " + mapOutputLength);
				}

				return mapOutput;
			}

			private MapOutput shuffleToDisk(MapOutputLocation mapOutputLoc,
					InputStream input, Path filename, long mapOutputLength)
					throws IOException {
				// Find out a suitable location for the output on
				// local-filesystem
				Path localFilename = lDirAlloc.getLocalPathForWrite(filename
						.toUri().getPath(), mapOutputLength, conf);

				MapOutput mapOutput = new MapOutput(mapOutputLoc.getTaskId(),
						mapOutputLoc.getTaskAttemptId(), conf, localFileSys
								.makeQualified(localFilename), mapOutputLength);

				// Copy data to local-disk
				OutputStream output = null;
				long bytesRead = 0;
				try {
					output = rfs.create(localFilename);

					byte[] buf = new byte[64 * 1024];
					int n = input.read(buf, 0, buf.length);
					while (n > 0) {
						bytesRead += n;
						shuffleClientMetrics.inputBytes(n);
						output.write(buf, 0, n);

						// indicate we're making progress
						reporter.progress();
						n = input.read(buf, 0, buf.length);
					}

					LOG.info("Read " + bytesRead
							+ " bytes from map-output for "
							+ mapOutputLoc.getTaskAttemptId());

					output.close();
					input.close();
				} catch (IOException ioe) {
					LOG.info("Failed to shuffle from "
							+ mapOutputLoc.getTaskAttemptId(), ioe);

					// Discard the map-output
					try {
						mapOutput.discard();
					} catch (IOException ignored) {
						LOG.info("Failed to discard map-output from "
								+ mapOutputLoc.getTaskAttemptId(), ignored);
					}
					mapOutput = null;

					// Close the streams
					IOUtils.cleanup(LOG, input, output);

					// Re-throw
					throw ioe;
				}

				// Sanity check
				if (bytesRead != mapOutputLength) {
					try {
						mapOutput.discard();
					} catch (Exception ioe) {
						// IGNORED because we are cleaning up
						LOG.info("Failed to discard map-output from "
								+ mapOutputLoc.getTaskAttemptId(), ioe);
					} catch (Throwable t) {
						String msg = getTaskID()
								+ " : Failed in shuffle to disk :"
								+ StringUtils.stringifyException(t);
						reportFatalError(getTaskID(), t, msg);
					}
					mapOutput = null;

					throw new IOException("Incomplete map output received for "
							+ mapOutputLoc.getTaskAttemptId() + " from "
							+ mapOutputLoc.getOutputLocation() + " ("
							+ bytesRead + " instead of " + mapOutputLength
							+ ")");
				}

				return mapOutput;

			}

		} // MapOutputCopier

		private void configureClasspath(JobConf conf) throws IOException {

			// get the task and the current classloader which will become the
			// parent
			Task task = RecoverReducerTask.this;
			ClassLoader parent = conf.getClassLoader();

			// get the work directory which holds the elements we are
			// dynamically
			// adding to the classpath
			File workDir = new File(task.getJobFile()).getParentFile();
			ArrayList<URL> urllist = new ArrayList<URL>();

			// add the jars and directories to the classpath
			String jar = conf.getJar();
			if (jar != null) {
				File jobCacheDir = new File(new Path(jar).getParent()
						.toString());

				File[] libs = new File(jobCacheDir, "lib").listFiles();
				if (libs != null) {
					for (int i = 0; i < libs.length; i++) {
						urllist.add(libs[i].toURL());
					}
				}
				urllist.add(new File(jobCacheDir, "classes").toURL());
				urllist.add(jobCacheDir.toURL());

			}
			urllist.add(workDir.toURL());

			// create a new classloader with the old classloader as its parent
			// then set that classloader as the one used by the current jobconf
			URL[] urls = urllist.toArray(new URL[urllist.size()]);
			URLClassLoader loader = new URLClassLoader(urls, parent);
			conf.setClassLoader(loader);
		}

		public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf,
				TaskReporter reporter) throws ClassNotFoundException,
				IOException {

			configureClasspath(conf);
			this.reporter = reporter;
			this.shuffleClientMetrics = new ShuffleClientMetrics(conf);
			this.umbilical = umbilical;
			this.reduceTask = RecoverReducerTask.this;

			this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
			this.copyResults = new ArrayList<CopyResult>(100);
			this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
			this.maxInFlight = 4 * numCopiers;
			this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
			Counters.Counter combineInputCounter = reporter
					.getCounter(Task.Counter.COMBINE_INPUT_RECORDS);
			this.combinerRunner = CombinerRunner.create(conf, getTaskID(),
					combineInputCounter, reporter, null);
			if (combinerRunner != null) {
				combineCollector = new CombineOutputCollector(
						reduceCombineOutputCounter);
			}

			this.ioSortFactor = conf.getInt("io.sort.factor", 10);
			// the exponential backoff formula
			// backoff (t) = init * base^(t-1)
			// so for max retries we get
			// backoff(1) + .... + backoff(max_fetch_retries) ~ max
			// solving which we get
			// max_fetch_retries ~ log((max * (base - 1) / init) + 1) /
			// log(base)
			// for the default value of max = 300 (5min) we get
			// max_fetch_retries = 6
			// the order is 4,8,16,32,64,128. sum of which is 252 sec = 4.2 min

			// optimizing for the base 2
			this.maxFetchRetriesPerMap = Math
					.max(
							MIN_FETCH_RETRIES_PER_MAP,
							getClosestPowerOf2((this.maxBackoff * 1000 / BACKOFF_INIT) + 1));
			this.maxFailedUniqueFetches = Math.min(numMaps,
					this.maxFailedUniqueFetches);
			this.maxInMemOutputs = conf.getInt("mapred.inmem.merge.threshold",
					1000);
			this.maxInMemCopyPer = conf.getFloat(
					"mapred.job.shuffle.merge.percent", 0.66f);
			final float maxRedPer = conf.getFloat(
					"mapred.job.reduce.input.buffer.percent", 0f);
			if (maxRedPer > 1.0 || maxRedPer < 0.0) {
				throw new IOException("mapred.job.reduce.input.buffer.percent"
						+ maxRedPer);
			}
			this.maxInMemReduce = (int) Math.min(Runtime.getRuntime()
					.maxMemory()
					* maxRedPer, Integer.MAX_VALUE);

			// Setup the RamManager
			ramManager = new ShuffleRamManager(conf);

			localFileSys = FileSystem.getLocal(conf);

			rfs = ((LocalFileSystem) localFileSys).getRaw();

			// hosts -> next contact time
			this.penaltyBox = new LinkedHashMap<String, Long>();

			// hostnames
			this.uniqueHosts = new HashSet<String>();

			// Seed the random number generator with a reasonably globally
			// unique seed
			long randomSeed = System.nanoTime()
					+ (long) Math.pow(this.reduceTask.getPartition(),
							(this.reduceTask.getPartition() % 10));
			this.random = new Random(randomSeed);
			this.maxMapRuntime = 0;
		}

		private boolean busyEnough(int numInFlight) {
			return numInFlight > maxInFlight;
		}

		public boolean fetchOutputs() throws IOException {
			int totalFailures = 0;
			int numInFlight = 0, numCopied = 0;
			DecimalFormat mbpsFormat = new DecimalFormat("0.00");
			final Progress copyPhase = reduceTask.getProgress().phase();
			LocalFSMerger localFSMergerThread = null;
			InMemFSMergeThread inMemFSMergeThread = null;
			GetMapEventsThread getMapEventsThread = null;

			for (int i = 0; i < numMaps; i++) {
				copyPhase.addPhase(); // add sub-phase per file
			}

			copiers = new ArrayList<MapOutputCopier>(numCopiers);

			// start all the copying threads
			for (int i = 0; i < numCopiers; i++) {
				MapOutputCopier copier = new MapOutputCopier(conf, reporter);
				copiers.add(copier);
				copier.start();
			}

			// start the on-disk-merge thread
			localFSMergerThread = new LocalFSMerger(
					(LocalFileSystem) localFileSys);
			// start the in memory merger thread
			inMemFSMergeThread = new InMemFSMergeThread();
			localFSMergerThread.start();
			inMemFSMergeThread.start();

			// start the map events thread
			getMapEventsThread = new GetMapEventsThread();
			getMapEventsThread.start();

			// start the clock for bandwidth measurement
			long startTime = System.currentTimeMillis();
			long currentTime = startTime;
			long lastProgressTime = startTime;
			long lastOutputTime = 0;

			// loop until we get all required outputs
			while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) {

				currentTime = System.currentTimeMillis();
				boolean logNow = false;
				if (currentTime - lastOutputTime > MIN_LOG_TIME) {
					lastOutputTime = currentTime;
					logNow = true;
				}
				if (logNow) {
					LOG.info(reduceTask.getTaskID() + " Need another "
							+ (numMaps - copiedMapOutputs.size())
							+ " map output(s) " + "where " + numInFlight
							+ " is already in progress");
				}

				// Put the hash entries for the failed fetches.
				Iterator<MapOutputLocation> locItr = retryFetches.iterator();

				while (locItr.hasNext()) {
					MapOutputLocation loc = locItr.next();
					List<MapOutputLocation> locList = mapLocations.get(loc
							.getHost());

					// Check if the list exists. Map output location mapping is
					// cleared
					// once the jobtracker restarts and is rebuilt from scratch.
					// Note that map-output-location mapping will be recreated
					// and hence
					// we continue with the hope that we might find some
					// locations
					// from the rebuild map.
					if (locList != null) {
						// Add to the beginning of the list so that this map is
						// tried again before the others and we can hasten the
						// re-execution of this map should there be a problem
						locList.add(0, loc);
					}
				}

				if (retryFetches.size() > 0) {
					LOG.info(reduceTask.getTaskID() + ": " + "Got "
							+ retryFetches.size()
							+ " map-outputs from previous failures");
				}
				// clear the "failed" fetches hashmap
				retryFetches.clear();

				// now walk through the cache and schedule what we can
				int numScheduled = 0;
				int numDups = 0;

				synchronized (scheduledCopies) {

					// Randomize the map output locations to prevent
					// all reduce-tasks swamping the same tasktracker
					List<String> hostList = new ArrayList<String>();
					hostList.addAll(mapLocations.keySet());

					Collections.shuffle(hostList, this.random);

					Iterator<String> hostsItr = hostList.iterator();

					while (hostsItr.hasNext()) {

						String host = hostsItr.next();

						List<MapOutputLocation> knownOutputsByLoc = mapLocations
								.get(host);

						// Check if the list exists. Map output location mapping
						// is
						// cleared once the jobtracker restarts and is rebuilt
						// from
						// scratch.
						// Note that map-output-location mapping will be
						// recreated and
						// hence we continue with the hope that we might find
						// some
						// locations from the rebuild map and add then for
						// fetching.
						if (knownOutputsByLoc == null
								|| knownOutputsByLoc.size() == 0) {
							continue;
						}

						// Identify duplicate hosts here
						if (uniqueHosts.contains(host)) {
							numDups += knownOutputsByLoc.size();
							continue;
						}

						Long penaltyEnd = penaltyBox.get(host);
						boolean penalized = false;

						if (penaltyEnd != null) {
							if (currentTime < penaltyEnd.longValue()) {
								penalized = true;
							} else {
								penaltyBox.remove(host);
							}
						}

						if (penalized)
							continue;

						synchronized (knownOutputsByLoc) {

							locItr = knownOutputsByLoc.iterator();

							while (locItr.hasNext()) {

								MapOutputLocation loc = locItr.next();

								// Do not schedule fetches from OBSOLETE maps
								if (obsoleteMapIds.contains(loc
										.getTaskAttemptId())) {
									locItr.remove();
									continue;
								}

								uniqueHosts.add(host);
								scheduledCopies.add(loc);
								locItr.remove(); // remove from knownOutputs
								numInFlight++;
								numScheduled++;

								break; // we have a map from this host
							}
						}
					}
					scheduledCopies.notifyAll();
				}

				if (numScheduled > 0 || logNow) {
					LOG.info(reduceTask.getTaskID() + " Scheduled "
							+ numScheduled + " outputs (" + penaltyBox.size()
							+ " slow hosts and" + numDups + " dup hosts)");
				}

				if (penaltyBox.size() > 0 && logNow) {
					LOG.info("Penalized(slow) Hosts: ");
					for (String host : penaltyBox.keySet()) {
						LOG.info(host + " Will be considered after: "
								+ ((penaltyBox.get(host) - currentTime) / 1000)
								+ " seconds.");
					}
				}

				// if we have no copies in flight and we can't schedule anything
				// new, just wait for a bit
				try {
					if (numInFlight == 0 && numScheduled == 0) {
						// we should indicate progress as we don't want TT to
						// think
						// we're stuck and kill us
						reporter.progress();
						Thread.sleep(5000);
					}
				} catch (InterruptedException e) {
				} // IGNORE

				while (numInFlight > 0 && mergeThrowable == null) {
					LOG.debug(reduceTask.getTaskID() + " numInFlight = "
							+ numInFlight);
					// the call to getCopyResult will either
					// 1) return immediately with a null or a valid CopyResult
					// object,
					// or
					// 2) if the numInFlight is above maxInFlight, return with a
					// CopyResult object after getting a notification from a
					// fetcher thread,
					// So, when getCopyResult returns null, we can be sure that
					// we aren't busy enough and we should go and get more
					// mapcompletion
					// events from the tasktracker
					CopyResult cr = getCopyResult(numInFlight);

					if (cr == null) {
						break;
					}

					if (cr.getSuccess()) { // a successful copy
						numCopied++;
						lastProgressTime = System.currentTimeMillis();
						reduceShuffleBytes.increment(cr.getSize());

						long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
						float mbs = ((float) reduceShuffleBytes.getCounter())
								/ (1024 * 1024);
						float transferRate = mbs / secsSinceStart;

						copyPhase.startNextPhase();
						copyPhase.setStatus("copy (" + numCopied + " of "
								+ numMaps + " at "
								+ mbpsFormat.format(transferRate) + " MB/s)");

						// Note successful fetch for this mapId to invalidate
						// (possibly) old fetch-failures
						fetchFailedMaps.remove(cr.getLocation().getTaskId());
					} else if (cr.isObsolete()) {
						// ignore

						// Note successful fetch for this mapId to invalidate
						// (possibly) old fetch-failures
						// fetchFailedMaps.remove(cr.getLocation().getTaskId());

						LOG
								.info(reduceTask.getTaskID()
										+ " Ignoring obsolete copy result for Map Task: "
										+ cr.getLocation().getTaskAttemptId()
										+ " from host: " + cr.getHost());
					} else {
						retryFetches.add(cr.getLocation());

						// note the failed-fetch
						TaskAttemptID mapTaskId = cr.getLocation()
								.getTaskAttemptId();
						TaskID mapId = cr.getLocation().getTaskId();

						totalFailures++;
						Integer noFailedFetches = mapTaskToFailedFetchesMap
								.get(mapTaskId);
						noFailedFetches = (noFailedFetches == null) ? 1
								: (noFailedFetches + 1);
						mapTaskToFailedFetchesMap.put(mapTaskId,
								noFailedFetches);
						LOG.info("Task " + getTaskID() + ": Failed fetch #"
								+ noFailedFetches + " from " + mapTaskId);

						// did the fetch fail too many times?
						// using a hybrid technique for notifying the
						// jobtracker.
						// a. the first notification is sent after max-retries
						// b. subsequent notifications are sent after 2 retries.
						if ((noFailedFetches >= maxFetchRetriesPerMap)
								&& ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
							synchronized (RecoverReducerTask.this) {
								taskStatus.addFetchFailedMap(mapTaskId);
								LOG
										.info("Failed to fetch map-output from "
												+ mapTaskId
												+ " even after MAX_FETCH_RETRIES_PER_MAP retries... "
												+ " reporting to the JobTracker");
							}
						}
						// note unique failed-fetch maps
						if (noFailedFetches == maxFetchRetriesPerMap) {
							fetchFailedMaps.add(mapId);

							// did we have too many unique failed-fetch maps?
							// and did we fail on too many fetch attempts?
							// and did we progress enough
							// or did we wait for too long without any progress?

							// check if the reducer is healthy
							boolean reducerHealthy = (((float) totalFailures / (totalFailures + numCopied)) < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);

							// check if the reducer has progressed enough
							boolean reducerProgressedEnough = (((float) numCopied / numMaps) >= MIN_REQUIRED_PROGRESS_PERCENT);

							// check if the reducer is stalled for a long time
							// duration for which the reducer is stalled
							int stallDuration = (int) (System
									.currentTimeMillis() - lastProgressTime);
							// duration for which the reducer ran with progress
							int shuffleProgressDuration = (int) (lastProgressTime - startTime);
							// min time the reducer should run without getting
							// killed
							int minShuffleRunDuration = (shuffleProgressDuration > maxMapRuntime) ? shuffleProgressDuration
									: maxMapRuntime;
							boolean reducerStalled = (((float) stallDuration / minShuffleRunDuration) >= MAX_ALLOWED_STALL_TIME_PERCENT);

							// kill if not healthy and has insufficient progress
							if ((fetchFailedMaps.size() >= maxFailedUniqueFetches || fetchFailedMaps
									.size() == (numMaps - copiedMapOutputs
									.size()))
									&& !reducerHealthy
									&& (!reducerProgressedEnough || reducerStalled)) {
								LOG
										.fatal("Shuffle failed with too many fetch failures "
												+ "and insufficient progress!"
												+ "Killing task "
												+ getTaskID()
												+ ".");
								umbilical.shuffleError(getTaskID(),
										"Exceeded MAX_FAILED_UNIQUE_FETCHES;"
												+ " bailing-out.");
							}
						}

						// back off exponentially until num_retries <=
						// max_retries
						// back off by max_backoff/2 on subsequent failed
						// attempts
						currentTime = System.currentTimeMillis();
						int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap ? BACKOFF_INIT
								* (1 << (noFailedFetches - 1))
								: (this.maxBackoff * 1000 / 2);
						penaltyBox.put(cr.getHost(), currentTime
								+ currentBackOff);
						LOG.warn(reduceTask.getTaskID() + " adding host "
								+ cr.getHost()
								+ " to penalty box, next contact in "
								+ (currentBackOff / 1000) + " seconds");
					}
					uniqueHosts.remove(cr.getHost());
					numInFlight--;
				}
			}

			// all done, inform the copiers to exit
			exitGetMapEvents = true;
			try {
				getMapEventsThread.join();
				LOG.info("getMapsEventsThread joined.");
			} catch (InterruptedException ie) {
				LOG.info("getMapsEventsThread threw an exception: "
						+ StringUtils.stringifyException(ie));
			}

			synchronized (copiers) {
				synchronized (scheduledCopies) {
					for (MapOutputCopier copier : copiers) {
						copier.interrupt();
					}
					copiers.clear();
				}
			}

			// copiers are done, exit and notify the waiting merge threads
			synchronized (mapOutputFilesOnDisk) {
				exitLocalFSMerge = true;
				mapOutputFilesOnDisk.notify();
			}

			ramManager.close();

			// Do a merge of in-memory files (if there are any)
			if (mergeThrowable == null) {
				try {
					// Wait for the on-disk merge to complete
					localFSMergerThread.join();
					LOG.info("Interleaved on-disk merge complete: "
							+ mapOutputFilesOnDisk.size() + " files left.");

					// wait for an ongoing merge (if it is in flight) to
					// complete
					inMemFSMergeThread.join();
					LOG.info("In-memory merge complete: "
							+ mapOutputsFilesInMemory.size() + " files left.");
				} catch (InterruptedException ie) {
					LOG
							.warn(reduceTask.getTaskID()
									+ " Final merge of the inmemory files threw an exception: "
									+ StringUtils.stringifyException(ie));
					// check if the last merge generated an error
					if (mergeThrowable != null) {
						mergeThrowable = ie;
					}
					return false;
				}
			}
			return mergeThrowable == null && copiedMapOutputs.size() == numMaps;
		}

		private long createInMemorySegments(
				List<Segment<K, V>> inMemorySegments, long leaveBytes)
				throws IOException {
			long totalSize = 0L;
			synchronized (mapOutputsFilesInMemory) {
				// fullSize could come from the RamManager, but files can be
				// closed but not yet present in mapOutputsFilesInMemory
				long fullSize = 0L;
				for (MapOutput mo : mapOutputsFilesInMemory) {
					fullSize += mo.data.length;
				}
				while (fullSize > leaveBytes) {
					MapOutput mo = mapOutputsFilesInMemory.remove(0);
					totalSize += mo.data.length;
					fullSize -= mo.data.length;
					Reader<K, V> reader = new InMemoryReader<K, V>(ramManager,
							mo.mapAttemptId, mo.data, 0, mo.data.length);
					Segment<K, V> segment = new Segment<K, V>(reader, true);
					inMemorySegments.add(segment);
				}
			}
			return totalSize;
		}

		/**
		 * Create a RawKeyValueIterator from copied map outputs. All copying
		 * threads have exited, so all of the map outputs are available either
		 * in memory or on disk. We also know that no merges are in progress, so
		 * synchronization is more lax, here.
		 * 
		 * The iterator returned must satisfy the following constraints: 1.
		 * Fewer than io.sort.factor files may be sources 2. No more than
		 * maxInMemReduce bytes of map outputs may be resident in memory when
		 * the reduce begins
		 * 
		 * If we must perform an intermediate merge to satisfy (1), then we can
		 * keep the excluded outputs from (2) in memory and include them in the
		 * first merge pass. If not, then said outputs must be written to disk
		 * first.
		 */
		@SuppressWarnings("unchecked")
		private RawKeyValueIterator createKVIterator(JobConf job,
				FileSystem fs, Reporter reporter) throws IOException {

			// merge config params
			Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
			Class<V> valueClass = (Class<V>) job.getMapOutputValueClass();
			boolean keepInputs = job.getKeepFailedTaskFiles();
			final Path tmpDir = new Path(getTaskID().toString());
			final RawComparator<K> comparator = (RawComparator<K>) job
					.getOutputKeyComparator();

			// segments required to vacate memory
			List<Segment<K, V>> memDiskSegments = new ArrayList<Segment<K, V>>();
			long inMemToDiskBytes = 0;
			if (mapOutputsFilesInMemory.size() > 0) {
				TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
				inMemToDiskBytes = createInMemorySegments(memDiskSegments,
						maxInMemReduce);
				final int numMemDiskSegments = memDiskSegments.size();
				if (numMemDiskSegments > 0
						&& ioSortFactor > mapOutputFilesOnDisk.size()) {
					// must spill to disk, but can't retain in-mem for
					// intermediate merge
					final Path outputPath = mapOutputFile.getInputFileForWrite(
							mapId, reduceTask.getTaskID(), inMemToDiskBytes,
							round);
					final RawKeyValueIterator rIter = Merger.merge(job, fs,
							keyClass, valueClass, memDiskSegments,
							numMemDiskSegments, tmpDir, comparator, reporter,
							spilledRecordsCounter, null);
					final Writer writer = new Writer(job, fs, outputPath,
							keyClass, valueClass, codec, null);
					try {
						Merger.writeFile(rIter, writer, reporter, job);
						addToMapOutputFilesOnDisk(fs.getFileStatus(outputPath));
					} catch (Exception e) {
						if (null != outputPath) {
							fs.delete(outputPath, true);
						}
						throw new IOException("Final merge failed", e);
					} finally {
						if (null != writer) {
							writer.close();
						}
					}
					LOG.info("Merged " + numMemDiskSegments + " segments, "
							+ inMemToDiskBytes + " bytes to disk to satisfy "
							+ "reduce memory limit");
					inMemToDiskBytes = 0;
					memDiskSegments.clear();
				} else if (inMemToDiskBytes != 0) {
					LOG.info("Keeping " + numMemDiskSegments + " segments, "
							+ inMemToDiskBytes + " bytes in memory for "
							+ "intermediate, on-disk merge");
				}
			}

			// segments on disk
			List<Segment<K, V>> diskSegments = new ArrayList<Segment<K, V>>();
			long onDiskBytes = inMemToDiskBytes;
			Path[] onDisk = getMapFiles(fs, false);
			for (Path file : onDisk) {
				onDiskBytes += fs.getFileStatus(file).getLen();
				diskSegments.add(new Segment<K, V>(job, fs, file, codec,
						keepInputs));
			}
			LOG.info("Merging " + onDisk.length + " files, " + onDiskBytes
					+ " bytes from disk");
			Collections.sort(diskSegments, new Comparator<Segment<K, V>>() {
				public int compare(Segment<K, V> o1, Segment<K, V> o2) {
					if (o1.getLength() == o2.getLength()) {
						return 0;
					}
					return o1.getLength() < o2.getLength() ? -1 : 1;
				}
			});

			// build final list of segments from merged backed by disk + in-mem
			List<Segment<K, V>> finalSegments = new ArrayList<Segment<K, V>>();
			long inMemBytes = createInMemorySegments(finalSegments, 0);
			LOG.info("Merging " + finalSegments.size() + " segments, "
					+ inMemBytes + " bytes from memory into reduce");
			if (0 != onDiskBytes) {
				final int numInMemSegments = memDiskSegments.size();
				diskSegments.addAll(0, memDiskSegments);
				memDiskSegments.clear();
				RawKeyValueIterator diskMerge = Merger.merge(job, fs, keyClass,
						valueClass, codec, diskSegments, ioSortFactor,
						numInMemSegments, tmpDir, comparator, reporter, false,
						spilledRecordsCounter, null);
				diskSegments.clear();
				if (0 == finalSegments.size()) {
					return diskMerge;
				}
				finalSegments.add(new Segment<K, V>(new RawKVIteratorReader(
						diskMerge, onDiskBytes), true));
			}
			return Merger.merge(job, fs, keyClass, valueClass, finalSegments,
					finalSegments.size(), tmpDir, comparator, reporter,
					spilledRecordsCounter, null);
		}

		class RawKVIteratorReader extends IFile.Reader<K, V> {

			private final RawKeyValueIterator kvIter;

			public RawKVIteratorReader(RawKeyValueIterator kvIter, long size)
					throws IOException {
				super(null, null, size, null, spilledRecordsCounter);
				this.kvIter = kvIter;
			}

			public boolean next(DataInputBuffer key, DataInputBuffer value)
					throws IOException {
				if (kvIter.next()) {
					final DataInputBuffer kb = kvIter.getKey();
					final DataInputBuffer vb = kvIter.getValue();
					final int kp = kb.getPosition();
					final int klen = kb.getLength() - kp;
					key.reset(kb.getData(), kp, klen);
					final int vp = vb.getPosition();
					final int vlen = vb.getLength() - vp;
					value.reset(vb.getData(), vp, vlen);
					bytesRead += klen + vlen;
					return true;
				}
				return false;
			}

			public long getPosition() throws IOException {
				return bytesRead;
			}

			public void close() throws IOException {
				kvIter.close();
			}
		}

		private CopyResult getCopyResult(int numInFlight) {
			synchronized (copyResults) {
				while (copyResults.isEmpty()) {
					try {
						// The idea is that if we have scheduled enough, we can
						// wait until
						// we hear from one of the copiers.
						if (busyEnough(numInFlight)) {
							copyResults.wait();
						} else {
							return null;
						}
					} catch (InterruptedException e) {
					}
				}
				return copyResults.remove(0);
			}
		}

		private void addToMapOutputFilesOnDisk(FileStatus status) {
			synchronized (mapOutputFilesOnDisk) {
				mapOutputFilesOnDisk.add(status);
				mapOutputFilesOnDisk.notify();
			}
		}

		/**
		 * Starts merging the local copy (on disk) of the map's output so that
		 * most of the reducer's input is sorted i.e overlapping shuffle and
		 * merge phases.
		 */
		private class LocalFSMerger extends Thread {
			private LocalFileSystem localFileSys;

			public LocalFSMerger(LocalFileSystem fs) {
				this.localFileSys = fs;
				setName("Thread for merging on-disk files");
				setDaemon(true);
			}

			@SuppressWarnings("unchecked")
			public void run() {
				try {
					LOG.info(reduceTask.getTaskID() + " Thread started: "
							+ getName());
					while (!exitLocalFSMerge) {
						synchronized (mapOutputFilesOnDisk) {
							while (!exitLocalFSMerge
									&& mapOutputFilesOnDisk.size() < (2 * ioSortFactor - 1)) {
								LOG.info(reduceTask.getTaskID()
										+ " Thread waiting: " + getName());
								mapOutputFilesOnDisk.wait();
							}
						}
						if (exitLocalFSMerge) {// to avoid running one extra
							// time in the end
							break;
						}
						List<Path> mapFiles = new ArrayList<Path>();
						long approxOutputSize = 0;
						int bytesPerSum = reduceTask.getConf().getInt(
								"io.bytes.per.checksum", 512);
						LOG.info(reduceTask.getTaskID() + "We have  "
								+ mapOutputFilesOnDisk.size()
								+ " map outputs on disk. "
								+ "Triggering merge of " + ioSortFactor
								+ " files");
						// 1. Prepare the list of files to be merged. This list
						// is prepared
						// using a list of map output files on disk. Currently
						// we merge
						// io.sort.factor files into 1.
						synchronized (mapOutputFilesOnDisk) {
							for (int i = 0; i < ioSortFactor; ++i) {
								FileStatus filestatus = mapOutputFilesOnDisk
										.first();
								mapOutputFilesOnDisk.remove(filestatus);
								mapFiles.add(filestatus.getPath());
								approxOutputSize += filestatus.getLen();
							}
						}

						// sanity check
						if (mapFiles.size() == 0) {
							return;
						}

						// add the checksum length
						approxOutputSize += ChecksumFileSystem
								.getChecksumLength(approxOutputSize,
										bytesPerSum);

						// 2. Start the on-disk merge process
						Path outputPath = lDirAlloc.getLocalPathForWrite(
								mapFiles.get(0).toString(), approxOutputSize,
								conf).suffix(".merged");
						Writer writer = new Writer(conf, rfs, outputPath, conf
								.getMapOutputKeyClass(), conf
								.getMapOutputValueClass(), codec, null);
						RawKeyValueIterator iter = null;
						Path tmpDir = new Path(reduceTask.getTaskID()
								.toString());
						try {
							iter = Merger.merge(conf, rfs, conf
									.getMapOutputKeyClass(), conf
									.getMapOutputValueClass(), codec, mapFiles
									.toArray(new Path[mapFiles.size()]), true,
									ioSortFactor, tmpDir, conf
											.getOutputKeyComparator(),
									reporter, spilledRecordsCounter, null);

							Merger.writeFile(iter, writer, reporter, conf);
							writer.close();
						} catch (Exception e) {
							localFileSys.delete(outputPath, true);
							throw new IOException(StringUtils
									.stringifyException(e));
						}

						synchronized (mapOutputFilesOnDisk) {
							addToMapOutputFilesOnDisk(localFileSys
									.getFileStatus(outputPath));
						}

						LOG.info(reduceTask.getTaskID()
								+ " Finished merging "
								+ mapFiles.size()
								+ " map output files on disk of total-size "
								+ approxOutputSize
								+ "."
								+ " Local output file is "
								+ outputPath
								+ " of size "
								+ localFileSys.getFileStatus(outputPath)
										.getLen());
					}
				} catch (Exception e) {
					LOG
							.warn(reduceTask.getTaskID()
									+ " Merging of the local FS files threw an exception: "
									+ StringUtils.stringifyException(e));
					if (mergeThrowable == null) {
						mergeThrowable = e;
					}
				} catch (Throwable t) {
					String msg = getTaskID()
							+ " : Failed to merge on the local FS"
							+ StringUtils.stringifyException(t);
					reportFatalError(getTaskID(), t, msg);
				}
			}
		}

		private class InMemFSMergeThread extends Thread {

			public InMemFSMergeThread() {
				setName("Thread for merging in memory files");
				setDaemon(true);
			}

			public void run() {
				LOG.info(reduceTask.getTaskID() + " Thread started: "
						+ getName());
				try {
					boolean exit = false;
					do {
						exit = ramManager.waitForDataToMerge();
						if (!exit) {
							doInMemMerge();
						}
					} while (!exit);
				} catch (Exception e) {
					LOG
							.warn(reduceTask.getTaskID()
									+ " Merge of the inmemory files threw an exception: "
									+ StringUtils.stringifyException(e));
					ReduceCopier.this.mergeThrowable = e;
				} catch (Throwable t) {
					String msg = getTaskID() + " : Failed to merge in memory"
							+ StringUtils.stringifyException(t);
					reportFatalError(getTaskID(), t, msg);
				}
			}

			@SuppressWarnings("unchecked")
			private void doInMemMerge() throws IOException {
				if (mapOutputsFilesInMemory.size() == 0) {
					return;
				}

				// name this output file same as the name of the first file that
				// is
				// there in the current list of inmem files (this is guaranteed
				// to
				// be absent on the disk currently. So we don't overwrite a
				// prev.
				// created spill). Also we need to create the output file now
				// since
				// it is not guaranteed that this file will be present after
				// merge
				// is called (we delete empty files as soon as we see them
				// in the merge method)

				// figure out the mapId
				TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;

				List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
				long mergeOutputSize = createInMemorySegments(inMemorySegments,
						0);
				int noInMemorySegments = inMemorySegments.size();

				Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
						reduceTask.getTaskID(), mergeOutputSize, round);

				Writer writer = new Writer(conf, rfs, outputPath, conf
						.getMapOutputKeyClass(), conf.getMapOutputValueClass(),
						codec, null);

				RawKeyValueIterator rIter = null;
				try {
					LOG.info("Initiating in-memory merge with "
							+ noInMemorySegments + " segments...");

					rIter = Merger.merge(conf, rfs, (Class<K>) conf
							.getMapOutputKeyClass(), (Class<V>) conf
							.getMapOutputValueClass(), inMemorySegments,
							inMemorySegments.size(), new Path(reduceTask
									.getTaskID().toString()), conf
									.getOutputKeyComparator(), reporter,
							spilledRecordsCounter, null);

					if (combinerRunner == null) {
						Merger.writeFile(rIter, writer, reporter, conf);
					} else {
						combineCollector.setWriter(writer);
						combinerRunner.combine(rIter, combineCollector);
					}
					writer.close();

					LOG.info(reduceTask.getTaskID() + " Merge of the "
							+ noInMemorySegments + " files in-memory complete."
							+ " Local file is " + outputPath + " of size "
							+ localFileSys.getFileStatus(outputPath).getLen());
				} catch (Exception e) {
					// make sure that we delete the ondisk file that we created
					// earlier when we invoked cloneFileAttributes
					localFileSys.delete(outputPath, true);
					throw (IOException) new IOException(
							"Intermediate merge failed").initCause(e);
				}

				// Note the output of the merge
				FileStatus status = localFileSys.getFileStatus(outputPath);
				synchronized (mapOutputFilesOnDisk) {
					addToMapOutputFilesOnDisk(status);
				}
			}
		}

		private class GetMapEventsThread extends Thread {

			private IntWritable fromEventId = new IntWritable(0);
			private static final long SLEEP_TIME = 1000;

			public GetMapEventsThread() {
				setName("Thread for polling Map Completion Events");
				setDaemon(true);
			}

			@Override
			public void run() {

				LOG.info(reduceTask.getTaskID() + " Thread started: "
						+ getName());

				do {
					try {
						// LOG.info("get events from " + fromEventId.get());
						int numNewMaps = getMapCompletionEvents();
						if (numNewMaps > 0) {
							LOG.info(reduceTask.getTaskID() + ": " + "Got "
									+ numNewMaps + " new map-outputs");
						}
						Thread.sleep(SLEEP_TIME);
					} catch (InterruptedException e) {
						LOG.warn(reduceTask.getTaskID()
								+ " GetMapEventsThread returning after an "
								+ " interrupted exception");
						return;
					} catch (Throwable t) {
						String msg = reduceTask.getTaskID()
								+ " GetMapEventsThread Ignoring exception : "
								+ StringUtils.stringifyException(t);
						reportFatalError(getTaskID(), t, msg);
					}
				} while (!exitGetMapEvents);

				LOG.info("GetMapEventsThread exiting");

			}

			private boolean called = false;

			/**
			 * Queries the {@link TaskTracker} for a set of map-completion
			 * events from a given event ID.
			 * 
			 * @throws IOException
			 */
			private int getMapCompletionEvents() throws IOException {

				int numNewMaps = 0;

				if (!called) {

					for (MapScheduleInfo msi : mapSchedules) {

						URI u = URI.create(msi.getHttpHost());
						String host = u.getHost();
						System.out.println("host " + host);
						TaskAttemptID taskId = msi.getTaskAttemptID();

						/**
						 * TODO has not considered maximum retry now
						 */

						URL mapOutputLocation = new URL(msi.getHttpHost()
								+ "/mapOutput?job=" + taskId.getJobID()
								+ "&map=" + taskId + "&reduce="
								+ getPartition() + "&iteration=" + round);
						System.out.println("recover copy address "
								+ mapOutputLocation.toString());
						List<MapOutputLocation> loc = mapLocations.get(host);
						if (loc == null) {
							loc = Collections
									.synchronizedList(new LinkedList<MapOutputLocation>());
							mapLocations.put(host, loc);
						}

						loc.add(new MapOutputLocation(taskId, host,
								mapOutputLocation));
						numNewMaps++;
					}

					/**
					 * this method should only be called once
					 */
					called = true;
				}

				return numNewMaps;
			}
		}
	}

	/**
	 * Return the exponent of the power of two closest to the given positive
	 * value, or zero if value leq 0. This follows the observation that the msb
	 * of a given value is also the closest power of two, unless the bit
	 * following it is set.
	 */
	private static int getClosestPowerOf2(int value) {
		if (value <= 0)
			throw new IllegalArgumentException("Undefined for " + value);
		final int hob = Integer.highestOneBit(value);
		return Integer.numberOfTrailingZeros(hob)
				+ (((hob >>> 1) & value) == 0 ? 0 : 1);
	}
}
